[[webflux-websocket]]
= WebSockets
[.small]#xref:web/websocket.adoc[See equivalent in the Servlet stack]#

This part of the reference documentation covers support for reactive-stack WebSocket
messaging.

include::partial$web/websocket-intro.adoc[leveloffset=+1]

[[webflux-websocket-server]]
== WebSocket API
[.small]#xref:web/websocket/stomp/server-config.adoc[See equivalent in the Servlet stack]#

The Spring Framework provides a WebSocket API that you can use to write client- and
server-side applications that handle WebSocket messages.



[[webflux-websocket-server-handler]]
=== Server
[.small]#xref:web/websocket/server.adoc#websocket-server-handler[See equivalent in the Servlet stack]#

To create a WebSocket server, you can first create a `WebSocketHandler`.
The following example shows how to do so:

[tabs]
======
Java::
+
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
----
	import org.springframework.web.reactive.socket.WebSocketHandler;
	import org.springframework.web.reactive.socket.WebSocketSession;

	public class MyWebSocketHandler implements WebSocketHandler {

		@Override
		public Mono<Void> handle(WebSocketSession session) {
			// ...
		}
	}
----

Kotlin::
+
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
----
	import org.springframework.web.reactive.socket.WebSocketHandler
	import org.springframework.web.reactive.socket.WebSocketSession

	class MyWebSocketHandler : WebSocketHandler {

		override fun handle(session: WebSocketSession): Mono<Void> {
			// ...
		}
	}
----
======

Then you can map it to a URL:

[tabs]
======
Java::
+
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
----
	@Configuration
	class WebConfig {

		@Bean
		public HandlerMapping handlerMapping() {
			Map<String, WebSocketHandler> map = new HashMap<>();
			map.put("/path", new MyWebSocketHandler());
			int order = -1; // before annotated controllers

			return new SimpleUrlHandlerMapping(map, order);
		}
	}
----

Kotlin::
+
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
----
	@Configuration
	class WebConfig {

		@Bean
		fun handlerMapping(): HandlerMapping {
			val map = mapOf("/path" to MyWebSocketHandler())
			val order = -1 // before annotated controllers

			return SimpleUrlHandlerMapping(map, order)
		}
	}
----
======

If using the xref:web/webflux/dispatcher-handler.adoc#webflux-framework-config[WebFlux Config] there is nothing
further to do, or otherwise if not using the WebFlux config you'll need to declare a
`WebSocketHandlerAdapter` as shown below:

[tabs]
======
Java::
+
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
----
	@Configuration
	class WebConfig {

		// ...

		@Bean
		public WebSocketHandlerAdapter handlerAdapter() {
			return new WebSocketHandlerAdapter();
		}
	}
----

Kotlin::
+
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
----
	@Configuration
	class WebConfig {

		// ...

		@Bean
		fun handlerAdapter() =  WebSocketHandlerAdapter()
	}
----
======



[[webflux-websockethandler]]
=== `WebSocketHandler`

The `handle` method of `WebSocketHandler` takes `WebSocketSession` and returns `Mono<Void>`
to indicate when application handling of the session is complete. The session is handled
through two streams, one for inbound and one for outbound messages. The following table
describes the two methods that handle the streams:

[options="header"]
|===
| `WebSocketSession` method | Description

| `Flux<WebSocketMessage> receive()`
| Provides access to the inbound message stream and completes when the connection is closed.

| `Mono<Void> send(Publisher<WebSocketMessage>)`
| Takes a source for outgoing messages, writes the messages, and returns a `Mono<Void>` that
  completes when the source completes and writing is done.

|===

A `WebSocketHandler` must compose the inbound and outbound streams into a unified flow and
return a `Mono<Void>` that reflects the completion of that flow. Depending on application
requirements, the unified flow completes when:

* Either the inbound or the outbound message stream completes.
* The inbound stream completes (that is, the connection closed), while the outbound stream is infinite.
* At a chosen point, through the `close` method of `WebSocketSession`.

When inbound and outbound message streams are composed together, there is no need to
check if the connection is open, since Reactive Streams signals end activity.
The inbound stream receives a completion or error signal, and the outbound stream
receives a cancellation signal.

The most basic implementation of a handler is one that handles the inbound stream. The
following example shows such an implementation:

[tabs]
======
Java::
+
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
----
	class ExampleHandler implements WebSocketHandler {

		@Override
		public Mono<Void> handle(WebSocketSession session) {
			return session.receive()			// <1>
					.doOnNext(message -> {
						// ...					// <2>
					})
					.concatMap(message -> {
						// ...					// <3>
					})
					.then();					// <4>
		}
	}
----
<1> Access the stream of inbound messages.
<2> Do something with each message.
<3> Perform nested asynchronous operations that use the message content.
<4> Return a `Mono<Void>` that completes when receiving completes.

Kotlin::
+
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
----
	class ExampleHandler : WebSocketHandler {

		override fun handle(session: WebSocketSession): Mono<Void> {
			return session.receive()            // <1>
					.doOnNext {
						// ...					// <2>
					}
					.concatMap {
						// ...					// <3>
					}
					.then()                     // <4>
		}
	}
----
<1> Access the stream of inbound messages.
<2> Do something with each message.
<3> Perform nested asynchronous operations that use the message content.
<4> Return a `Mono<Void>` that completes when receiving completes.
======


TIP: For nested, asynchronous operations, you may need to call `message.retain()` on underlying
servers that use pooled data buffers (for example, Netty). Otherwise, the data buffer may be
released before you have had a chance to read the data. For more background, see
xref:core/databuffer-codec.adoc[Data Buffers and Codecs].

The following implementation combines the inbound and outbound streams:

[tabs]
======
Java::
+
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
----
	class ExampleHandler implements WebSocketHandler {

		@Override
		public Mono<Void> handle(WebSocketSession session) {

			Flux<WebSocketMessage> output = session.receive()				// <1>
					.doOnNext(message -> {
						// ...
					})
					.concatMap(message -> {
						// ...
					})
					.map(value -> session.textMessage("Echo " + value));	// <2>

			return session.send(output);									// <3>
		}
	}
----
<1> Handle the inbound message stream.
<2> Create the outbound message, producing a combined flow.
<3> Return a `Mono<Void>` that does not complete while we continue to receive.

Kotlin::
+
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
----
	class ExampleHandler : WebSocketHandler {

		override fun handle(session: WebSocketSession): Mono<Void> {

			val output = session.receive()                      // <1>
					.doOnNext {
						// ...
					}
					.concatMap {
						// ...
					}
					.map { session.textMessage("Echo $it") }    // <2>

			return session.send(output)                         // <3>
		}
	}
----
<1> Handle the inbound message stream.
<2> Create the outbound message, producing a combined flow.
<3> Return a `Mono<Void>` that does not complete while we continue to receive.
======


Inbound and outbound streams can be independent and be joined only for completion,
as the following example shows:

[tabs]
======
Java::
+
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
----
	class ExampleHandler implements WebSocketHandler {

		@Override
		public Mono<Void> handle(WebSocketSession session) {

			Mono<Void> input = session.receive()								<1>
					.doOnNext(message -> {
						// ...
					})
					.concatMap(message -> {
						// ...
					})
					.then();

			Flux<String> source = ... ;
			Mono<Void> output = session.send(source.map(session::textMessage));	<2>

			return Mono.zip(input, output).then();								<3>
		}
	}
----
<1> Handle inbound message stream.
<2> Send outgoing messages.
<3> Join the streams and return a `Mono<Void>` that completes when either stream ends.

Kotlin::
+
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
----
	class ExampleHandler : WebSocketHandler {

		override fun handle(session: WebSocketSession): Mono<Void> {

			val input = session.receive()									// <1>
					.doOnNext {
						// ...
					}
					.concatMap {
						// ...
					}
					.then()

			val source: Flux<String> = ...
			val output = session.send(source.map(session::textMessage))		// <2>

			return Mono.zip(input, output).then()							// <3>
		}
	}
----
<1> Handle inbound message stream.
<2> Send outgoing messages.
<3> Join the streams and return a `Mono<Void>` that completes when either stream ends.
======



[[webflux-websocket-databuffer]]
=== `DataBuffer`

`DataBuffer` is the representation for a byte buffer in WebFlux. The Spring Core part of
the reference has more on that in the section on
xref:core/databuffer-codec.adoc[Data Buffers and Codecs]. The key point to understand is that on some
servers like Netty, byte buffers are pooled and reference counted, and must be released
when consumed to avoid memory leaks.

When running on Netty, applications must use `DataBufferUtils.retain(dataBuffer)` if they
wish to hold on input data buffers in order to ensure they are not released, and
subsequently use `DataBufferUtils.release(dataBuffer)` when the buffers are consumed.




[[webflux-websocket-server-handshake]]
=== Handshake
[.small]#xref:web/websocket/server.adoc#websocket-server-handshake[See equivalent in the Servlet stack]#

`WebSocketHandlerAdapter` delegates to a `WebSocketService`. By default, that is an instance
of `HandshakeWebSocketService`, which performs basic checks on the WebSocket request and
then uses `RequestUpgradeStrategy` for the server in use. Currently, there is built-in
support for Reactor Netty, Tomcat, Jetty, and Undertow.

`HandshakeWebSocketService` exposes a `sessionAttributePredicate` property that allows
setting a `Predicate<String>` to extract attributes from the `WebSession` and insert them
into the attributes of the `WebSocketSession`.



[[webflux-websocket-server-config]]
=== Server Configuration
[.small]#xref:web/websocket/server.adoc#websocket-server-runtime-configuration[See equivalent in the Servlet stack]#

The `RequestUpgradeStrategy` for each server exposes configuration specific to the
underlying WebSocket server engine. When using the WebFlux Java config you can customize
such properties as shown in the corresponding section of the
xref:web/webflux/config.adoc#webflux-config-websocket-service[WebFlux Config], or otherwise if
not using the WebFlux config, use the below:

[tabs]
======
Java::
+
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
----
	@Configuration
	class WebConfig {

		@Bean
		public WebSocketHandlerAdapter handlerAdapter() {
			return new WebSocketHandlerAdapter(webSocketService());
		}

		@Bean
		public WebSocketService webSocketService() {
			TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
			strategy.setMaxSessionIdleTimeout(0L);
			return new HandshakeWebSocketService(strategy);
		}
	}
----

Kotlin::
+
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
----
	@Configuration
	class WebConfig {

		@Bean
		fun handlerAdapter() =
				WebSocketHandlerAdapter(webSocketService())

		@Bean
		fun webSocketService(): WebSocketService {
			val strategy = TomcatRequestUpgradeStrategy().apply {
				setMaxSessionIdleTimeout(0L)
			}
			return HandshakeWebSocketService(strategy)
		}
	}
----
======

Check the upgrade strategy for your server to see what options are available. Currently,
only Tomcat and Jetty expose such options.



[[webflux-websocket-server-cors]]
=== CORS
[.small]#xref:web/websocket/server.adoc#websocket-server-allowed-origins[See equivalent in the Servlet stack]#

The easiest way to configure CORS and restrict access to a WebSocket endpoint is to
have your `WebSocketHandler` implement `CorsConfigurationSource` and return a
`CorsConfiguration` with allowed origins, headers, and other details. If you cannot do
that, you can also set the `corsConfigurations` property on the `SimpleUrlHandler` to
specify CORS settings by URL pattern. If both are specified, they are combined by using the
`combine` method on `CorsConfiguration`.



[[webflux-websocket-client]]
=== Client

Spring WebFlux provides a `WebSocketClient` abstraction with implementations for
Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (that is, JSR-356).

NOTE: The Tomcat client is effectively an extension of the standard Java one with some extra
functionality in the `WebSocketSession` handling to take advantage of the Tomcat-specific
API to suspend receiving messages for back pressure.

To start a WebSocket session, you can create an instance of the client and use its `execute`
methods:

[tabs]
======
Java::
+
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
----
	WebSocketClient client = new ReactorNettyWebSocketClient();

	URI url = new URI("ws://localhost:8080/path");
	client.execute(url, session ->
			session.receive()
					.doOnNext(System.out::println)
					.then());
----

Kotlin::
+
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
----
	val client = ReactorNettyWebSocketClient()

			val url = URI("ws://localhost:8080/path")
			client.execute(url) { session ->
				session.receive()
						.doOnNext(::println)
				.then()
			}
----
======

Some clients, such as Jetty, implement `Lifecycle` and need to be stopped and started
before you can use them. All clients have constructor options related to configuration
of the underlying WebSocket client.
